Avro

本文为您介绍Avro格式的使用示例、配置选项和类型映射。

背景信息

Avro格式允许基于Avro的结构读写Avro数据。当前,Avro结构是基于表结构推导而来的。支持Avro格式的连接器包括消息队列KafkaUpsert Kafka对象存储OSS

使用示例

利用Kafka以及Avro格式构建表的示例如下。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)

配置选项

参数

是否必选

默认值

类型

描述

format

(none)

String

声明使用的格式。使用Avro格式时,参数取值为avro。

avro.codec

(none)

String

指定Avro压缩的编解码器,仅适用于连接器为Filesystem的情况。参数取值如下:

  • snappy(默认值)

  • null

  • deflate

  • bzip2

  • xz

类型映射

FlinkAvro的数据类型的映射关系如下。

Flink SQL类型

Avro类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

bytes

DECIMAL

fixed

说明

带有精度的十进制数。

TINYINT

int

SMALLINT

int

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

DATE

int

说明

日期。

TIME

int

说明

以毫秒为单位的时间。

TIMESTAMP

long

说明

以毫秒为单位的时间戳。

ARRAY

array

MAP

说明

元素必须是STRING、CHARVARCHAR类型。

map

MULTISET

说明

元素必须是STRING、CHARVARCHAR类型。

map

ROW

record

除了以上类型,Flink支持读取和写入nullable的类型。Flinknullable的类型映射到Avro union(something, null),其中something是从Flink类型转换的Avro类型。

说明

关于Avro类型的信息,详情请见Avro 规范